package com.calabar.put;

import com.alibaba.fastjson.JSON;
import com.calabar.entity.DataPutEntity;
import com.calabar.entity.OpenTsdbDataEntity;
import com.calabar.query.QuerySubStdCode;
import com.calabar.util.HttpClientPost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository;

import java.util.*;

/**
 * <p/>
 * <li>Description: 数据写入实现类</li>
 * <li>@author: zijian.wu</li>
 * <li>Date: 2018/4/20 14:14</li>
 */
@Repository
public class DataPut implements IDataPut {
    /**
     * 日志记录
     */
    private static final Logger LOGGER = LogManager.getLogger(DataPut.class);

    /**
     * 查询子系统编码的对象
     */
    @Autowired
    private QuerySubStdCode querySubStdCode;

    /**
     * opentsdb的URL_PUT配置文件获取
     */
    @Value("${opentsdb.url.put}")
    private String URL_PUT;

    /**
     * Hbase表名
     */
    @Value("${hbase.table-name}")
    private String TABLE_NAME;

    /**
     * 列族名
     */
    @Value("${hbase.column-families}")
    private String COLUMN_FAMALIES;

    /**
     * 时序数据库的PLANT plt_code
     */
    private final static String PLANT_TAG_KEY = "plt_code";

    /**
     * 时序数据库的SET set_code
     */
    private final static String SET_TAG_KEY = "set_code";
    /**
     * 时序数据库存储的原始值后缀
     */
    private final static String VALUE_SUFFIX = "_src_value";

    /**
     * 时序数据库存储的原始质量标签后缀
     */
    private final static String QUALITY_SUFFIX = "_src_data_quality";


    @Override
    public void put(List<DataPutEntity> data) throws Exception {
        Set<String> stdCodesSet = new HashSet<>();

        for (DataPutEntity piece : data) {
            stdCodesSet.add(piece.getStdCode());
        }

        List<String> stdCodes = new ArrayList<>(stdCodesSet);

        Map<String, String> stdMapSub = querySubStdCode.findStdMapSub(stdCodes);

        put2tsdb(data, stdMapSub);

        send2Hbase(data, stdMapSub);

    }

    public boolean put2tsdb(List<DataPutEntity> data, Map<String, String> stdMapSub) throws Exception {

        List<OpenTsdbDataEntity> putData = new ArrayList<>();
        //转为opentsdb标准数据格式
        for (DataPutEntity onePieceData : data) {

            //获取属性
            //测点编码
            String stdCode = onePieceData.getStdCode();
            //电厂编码与机组编码
            String pltCode = onePieceData.getPltCode();
            String setCode = onePieceData.getSetCode();
            //时间戳
            Long timestamp = onePieceData.getTimestamp();
            //值
            Float value = onePieceData.getValue();
            //质量标签
            Integer quality = onePieceData.getQuality();

            //值数据
            OpenTsdbDataEntity valueData = new OpenTsdbDataEntity();

            valueData.setMetric(stdMapSub.get(stdCode) + VALUE_SUFFIX);

            HashMap<String, String> tags = new HashMap<>();
            tags.put(PLANT_TAG_KEY, pltCode);
            tags.put(SET_TAG_KEY, setCode);
            valueData.setTags(tags);

            valueData.setTimestamp(timestamp);

            valueData.setValue(value);

            //质量标签数据
            OpenTsdbDataEntity qualityData = new OpenTsdbDataEntity();

            qualityData.setMetric(stdMapSub.get(stdCode) + QUALITY_SUFFIX);

            HashMap<String, String> qualityTags = new HashMap<>();
            qualityTags.put(PLANT_TAG_KEY, pltCode);
            qualityTags.put(SET_TAG_KEY, setCode);
            qualityData.setTags(qualityTags);

            qualityData.setTimestamp(timestamp);

            qualityData.setValue(quality);

            putData.add(valueData);
            putData.add(qualityData);

        }
        String dataJson = JSON.toJSONString(putData);

        LOGGER.info(dataJson);

        boolean isSuccessful = HttpClientPost.postReturnBoolean(URL_PUT, dataJson);

        return isSuccessful;
    }

    public void send2Hbase(List<DataPutEntity> data, Map<String, String> stdMapSub) throws Exception {
        //列族
        String[] columnFamilies = COLUMN_FAMALIES.split(";");
        //hbase格式数据
        Map<String, List<Map<String, Object>>> formattedData = new HashMap<>();

        for (DataPutEntity onePieceData : data) {
            //获取属性
            //测点编码
            String stdCode = onePieceData.getStdCode();
            //电厂编码与机组编码
            String pltCode = onePieceData.getPltCode();
            String setCode = onePieceData.getSetCode();
            //时间戳
            Long timestamp = onePieceData.getTimestamp();
            //值
            Float value = onePieceData.getValue();
            //质量标签
            Integer quality = onePieceData.getQuality();

            //子系统编码
            String subCode = stdMapSub.get(stdCode);
            //子系统编码后六位
            String subCodeEnd = subCode.substring(subCode.length() - 6, subCode.length());

            //Rowkey 子系统编码后6位（6）+电厂编码（4）+机组编码（2）+采集时间戳（10）
            String rowkey = subCodeEnd + pltCode + setCode + timestamp;

            //列族数据
            List<Map<String, Object>> cfData = new ArrayList<>();

            //列与单元
            Map<String, Object> columnAndCell = new HashMap<>();
            columnAndCell.put("std", pltCode + setCode + subCode);
            columnAndCell.put("sutd", subCode);
            columnAndCell.put("stt", timestamp);
            columnAndCell.put("srv", value);
            columnAndCell.put("sdq", quality);
            columnAndCell.put("pc", pltCode);
            columnAndCell.put("sc", setCode);

            cfData.add(columnAndCell);

            formattedData.put(rowkey, cfData);
        }


        SendToHbase.putDataList(TABLE_NAME, columnFamilies, formattedData);
    }
}
